Sample queue depth via observable gauge#81
Conversation
Migrate messaging.queue.depth from a synchronous gauge recorded on the message hot path to an observable gauge sampled by the telemetry SDK at each collection interval. The depth now stays fresh even when the queue is idle or stuck, instead of only being re-recorded once per processed message, and the per-message getQueueSize() call leaves the hot path. Requires utopia-php/telemetry 0.4.* for createObservableGauge. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Greptile SummaryMigrates
Confidence Score: 5/5Safe to merge — the refactor is self-contained, all three guard paths are tested, and no hot-path logic is affected. The change correctly moves queue-depth sampling out of the message hot path into a pull-model callback. The Publisher check, attribute set, and silent error handling are all preserved unchanged. Tests drive the new pull model correctly and cover the three relevant cases (publisher with sizes, non-publisher, and throwing publisher). No regressions are visible in the changed files. No files require special attention. Important Files Changed
Reviews (1): Last reviewed commit: "Drop explanatory comments" | Re-trigger Greptile |
Summary
Migrates the
messaging.queue.depthmetric from a synchronous gauge — recorded imperatively on the message hot path — to an observable gauge sampled by the telemetry SDK at each collection interval.Previously the depth was recorded via
Gauge::record()once at worker start and again in thefinallyafter every processed message. That coupled the sampling cadence to throughput:record()never fires and the gauge goes stale exactly when it matters most.listSizeround-trip ran on the hot path for every message.An observable gauge fixes both: the SDK invokes the registered
observe()callback on a fixed export cadence, decoupled from message processing, and uses last-value semantics.Changes
composer.json— bumputopia-php/telemetry0.2.*→0.4.*(addscreateObservableGauge).src/Queue/Server.php— swapGauge/createGaugeforObservableGauge/createObservableGauge; register anobserve()callback that readsgetQueueSize()and reports it with the same attributes (messaging.destination.name/namespace) and the samePublisher-check andtry/catchguards. RemoverecordQueueDepth()and both call sites.tests/.../ServerTelemetryTest.php— update to the pull model: assert against$telemetry->observableGaugesand invoke the registered callback via acollectObservations()helper (one invocation = one collection cycle).Not covered here
This does not by itself fix the Grafana double-counting on the dashboard — that's a fan-in problem (N workers/pods each reporting the same shared Redis depth under identical attributes), which needs the dashboard query to
max/avgacross series rather thansum(). The gauge also still only samples the pending queue, not the failed queue. Both are tracked as follow-ups.Test plan
composer run test(ServerTelemetryTest3/3 pass)composer run check(phpstan clean,--memory-limit=512M)composer run lint(pint passed)🤖 Generated with Claude Code